Data Source: https://archive.ics.uci.edu/ml/datasets/Combined+Cycle+Power+Plant

Features consist of hourly average ambient variables

Temperature (T) in the range 1.81°C and 37.11°C, Ambient Pressure (AP) in the range 992.89-1033.30 milibar, Relative Humidity (RH) in the range 25.56% to 100.16% Exhaust Vacuum (V) in teh range 25.36-81.56 cm Hg Net hourly electrical energy output (EP) 420.26-495.76 MW The averages are taken from various sensors located around the plant that record the ambient variables every second. The variables are given without normalization.

Dataset Information:

The dataset contains 9568 data points collected from a Combined Cycle Power Plant over 6 years (2006-2011), when the power plant was set to work with full load. Features consist of hourly average ambient variables Temperature (T), Ambient Pressure (AP), Relative Humidity (RH) and Exhaust Vacuum (V) to predict the net hourly electrical energy output (EP) of the plant. A combined cycle power plant (CCPP) is composed of gas turbines (GT), steam turbines (ST) and heat recovery steam generators. In a CCPP, the electricity is generated by gas and steam turbines, which are combined in one cycle, and is transferred from one turbine to another. While the Vacuum is colected from and has effect on the Steam Turbine, he other three of the ambient variables effect the GT performance.


In [1]:
!ls -ltr /data


total 1584948
drwxr-xr-x 2 training training      4096 Jun  5  2009 cifar-10-batches-py
-rw-r--r-- 1 training training 150828752 Nov  5  2016 creditcard-fraud.csv
-rw-r--r-- 1 training training    133638 Jun  6  2017 credit-default.csv
-rw-r--r-- 1 training training   1215506 Jun 18  2017 CleanCreditScoring.csv
-rw-r--r-- 1 training training     57459 Sep 12  2017 istanbul-stock.csv
-rw-r--r-- 1 training training      2249 Nov  8 07:15 story.txt
-rw-r--r-- 1 training training 127744095 Nov  8 07:16 stocks.csv
-rw-r--r-- 1 training training       160 Nov  8 07:16 poem.txt
-rw-r--r-- 1 training training  69055807 Nov  8 07:16 imdb-comments.json
-rw-r--r-- 1 training training 526915613 Nov  8 07:18 tweets.json
drwxr-xr-x 2 training training      4096 Nov  8 07:18 MNIST
drwxr-xr-x 2 training training      4096 Nov  8 07:18 weblogs
drwxr-xr-x 2 training training      4096 Dec  1 20:05 kaggle-house-price
-rw-r--r-- 1 training training   2772143 Dec  2 13:46 diamonds.csv
-rw-rw-r-- 1 training training      5107 Feb 19 00:12 iris.csv
-rw-r--r-- 1 training training   1543414 Mar 14 20:44 Combined_Cycle_Power_Plant.csv
-rw-r--r-- 1 training training 742579829 Mar 16 10:34 kddcup.data
-rw-rw-r-- 1 training training     19130 Mar 16 12:38 gdp.csv
drwxr-xr-x 2 training training      4096 Apr  2 13:12 movielens
-rw-rw-r-- 1 training training     54292 Apr  2 13:12 insurance.csv

In [2]:
spark


Out[2]:

SparkSession - hive

SparkContext

Spark UI

Version
v2.3.0
Master
local[*]
AppName
PySparkShell

Load Data


In [3]:
df = spark.read.format("csv").option("header","true")\
.option("inferSchema","true").load("/data/Combined_Cycle_Power_Plant.csv")

In [4]:
df.show()


+-----+-----+-------+-----+------+
|   AT|    V|     AP|   RH|    EP|
+-----+-----+-------+-----+------+
|14.96|41.76|1024.07|73.17|463.26|
|25.18|62.96|1020.04|59.08|444.37|
| 5.11| 39.4|1012.16|92.14|488.56|
|20.86|57.32|1010.24|76.64|446.48|
|10.82| 37.5|1009.23|96.62| 473.9|
|26.27|59.44|1012.23|58.77|443.67|
|15.89|43.96|1014.02|75.24|467.35|
| 9.48|44.71|1019.12|66.43|478.42|
|14.64| 45.0|1021.78|41.25|475.98|
|11.74|43.56|1015.14|70.72| 477.5|
|17.99|43.72|1008.64|75.04|453.02|
|20.14|46.93|1014.66|64.22|453.99|
|24.34| 73.5|1011.31|84.15|440.29|
|25.71|58.59|1012.77|61.83|451.28|
|26.19|69.34|1009.48|87.59|433.99|
|21.42|43.79|1015.76|43.08|462.19|
|18.21| 45.0|1022.86|48.84|467.54|
|11.04|41.74| 1022.6|77.51| 477.2|
|14.45|52.75|1023.97|63.59|459.85|
|13.97|38.47|1015.15|55.28| 464.3|
+-----+-----+-------+-----+------+
only showing top 20 rows


In [5]:
df.cache()


Out[5]:
DataFrame[AT: double, V: double, AP: double, RH: double, EP: double]

Convert Spark Dataframe to Pandas Dataframe


In [6]:
df.limit(10).toPandas().head()


Out[6]:
AT V AP RH EP
0 14.96 41.76 1024.07 73.17 463.26
1 25.18 62.96 1020.04 59.08 444.37
2 5.11 39.40 1012.16 92.14 488.56
3 20.86 57.32 1010.24 76.64 446.48
4 10.82 37.50 1009.23 96.62 473.90

Verctorize the features


In [7]:
from pyspark.ml.feature import *

In [8]:
vectorizer = VectorAssembler()
vectorizer.setInputCols(["AT", "V", "AP", "RH"])
vectorizer.setOutputCol("features")

df_vect = vectorizer.transform(df)
df_vect.show(10, False)


+-----+-----+-------+-----+------+---------------------------+
|AT   |V    |AP     |RH   |EP    |features                   |
+-----+-----+-------+-----+------+---------------------------+
|14.96|41.76|1024.07|73.17|463.26|[14.96,41.76,1024.07,73.17]|
|25.18|62.96|1020.04|59.08|444.37|[25.18,62.96,1020.04,59.08]|
|5.11 |39.4 |1012.16|92.14|488.56|[5.11,39.4,1012.16,92.14]  |
|20.86|57.32|1010.24|76.64|446.48|[20.86,57.32,1010.24,76.64]|
|10.82|37.5 |1009.23|96.62|473.9 |[10.82,37.5,1009.23,96.62] |
|26.27|59.44|1012.23|58.77|443.67|[26.27,59.44,1012.23,58.77]|
|15.89|43.96|1014.02|75.24|467.35|[15.89,43.96,1014.02,75.24]|
|9.48 |44.71|1019.12|66.43|478.42|[9.48,44.71,1019.12,66.43] |
|14.64|45.0 |1021.78|41.25|475.98|[14.64,45.0,1021.78,41.25] |
|11.74|43.56|1015.14|70.72|477.5 |[11.74,43.56,1015.14,70.72]|
+-----+-----+-------+-----+------+---------------------------+
only showing top 10 rows


In [9]:
print(vectorizer.explainParams())


inputCols: input column names. (current: ['AT', 'V', 'AP', 'RH'])
outputCol: output column name. (default: VectorAssembler_4bd6aa0aa3917dd77da5__output, current: features)

Fit Linear Regression Model


In [10]:
from pyspark.ml.regression import LinearRegression

In [11]:
lr = LinearRegression()
print(lr.explainParams())


aggregationDepth: suggested depth for treeAggregate (>= 2). (default: 2)
elasticNetParam: the ElasticNet mixing parameter, in range [0, 1]. For alpha = 0, the penalty is an L2 penalty. For alpha = 1, it is an L1 penalty. (default: 0.0)
epsilon: The shape parameter to control the amount of robustness. Must be > 1.0. Only valid when loss is huber (default: 1.35)
featuresCol: features column name. (default: features)
fitIntercept: whether to fit an intercept term. (default: True)
labelCol: label column name. (default: label)
loss: The loss function to be optimized. Supported options: squaredError, huber. (default: squaredError)
maxIter: max number of iterations (>= 0). (default: 100)
predictionCol: prediction column name. (default: prediction)
regParam: regularization parameter (>= 0). (default: 0.0)
solver: The solver algorithm for optimization. Supported options: auto, normal, l-bfgs. (default: auto)
standardization: whether to standardize the training features before fitting the model. (default: True)
tol: the convergence tolerance for iterative algorithms (>= 0). (default: 1e-06)
weightCol: weight column name. If this is not set or empty, we treat all instance weights as 1.0. (undefined)

In [12]:
lr.setLabelCol("EP")
lr.setFeaturesCol("features")
model = lr.fit(df_vect)

In [13]:
type(model)


Out[13]:
pyspark.ml.regression.LinearRegressionModel

View model summary


In [14]:
print("R2:", model.summary.r2)
print("Intercept: ", model.intercept, "Coefficients", model.coefficients)


R2: 0.9286835997167648
Intercept:  454.5637357984046 Coefficients [-1.9773160434618613,-0.23402845649906473,0.06212776009866186,-0.15801655439825457]

Predict


In [15]:
df_pred = model.transform(df_vect)
df_pred.show()


+-----+-----+-------+-----+------+--------------------+------------------+
|   AT|    V|     AP|   RH|    EP|            features|        prediction|
+-----+-----+-------+-----+------+--------------------+------------------+
|14.96|41.76|1024.07|73.17|463.26|[14.96,41.76,1024...| 467.2711634437306|
|25.18|62.96|1020.04|59.08|444.37|[25.18,62.96,1020...|444.07766858004396|
| 5.11| 39.4|1012.16|92.14|488.56|[5.11,39.4,1012.1...|483.56251796945776|
|20.86|57.32|1010.24|76.64|446.48|[20.86,57.32,1010...| 450.5559716382537|
|10.82| 37.5|1009.23|96.62| 473.9|[10.82,37.5,1009....| 471.8267489278455|
|26.27|59.44|1012.23|58.77|443.67|[26.27,59.44,1012...| 442.3099415850402|
|15.89|43.96|1014.02|75.24|467.35|[15.89,43.96,1014...|463.96591866241715|
| 9.48|44.71|1019.12|66.43|478.42|[9.48,44.71,1019....| 478.1739705793852|
|14.64| 45.0|1021.78|41.25|475.98|[14.64,45.0,1021....|472.04726822434776|
|11.74|43.56|1015.14|70.72| 477.5|[11.74,43.56,1015...| 473.0492095425741|
|17.99|43.72|1008.64|75.04|453.02|[17.99,43.72,1008...| 459.5670777622559|
|20.14|46.93|1014.66|64.22|453.99|[20.14,46.93,1014...|456.64836515783395|
|24.34| 73.5|1011.31|84.15|440.29|[24.34,73.5,1011....| 438.7681037606262|
|25.71|58.59|1012.77|61.83|451.28|[25.71,58.59,1012...| 443.1661810913976|
|26.19|69.34|1009.48|87.59|433.99|[26.19,69.34,1009...| 435.4263567111474|
|21.42|43.79|1015.76|43.08|462.19|[21.42,43.79,1015...| 458.2610604716974|
|18.21| 45.0|1022.86|48.84|467.54|[18.21,45.0,1022....|463.85600228221267|
|11.04|41.74| 1022.6|77.51| 477.2|[11.04,41.74,1022...| 474.2498032497976|
|14.45|52.75|1023.97|63.59|459.85|[14.45,52.75,1023...| 467.2152077040968|
|13.97|38.47|1015.15|55.28| 464.3|[13.97,38.47,1015...|472.27139648674444|
+-----+-----+-------+-----+------+--------------------+------------------+
only showing top 20 rows

Evaluate


In [16]:
from pyspark.ml.evaluation import RegressionEvaluator

In [17]:
evaluator = RegressionEvaluator()
print(evaluator.explainParams())


labelCol: label column name. (default: label)
metricName: metric name in evaluation - one of:
                       rmse - root mean squared error (default)
                       mse - mean squared error
                       r2 - r^2 metric
                       mae - mean absolute error. (default: rmse)
predictionCol: prediction column name. (default: prediction)

In [18]:
evaluator = RegressionEvaluator(labelCol = "EP", 
                                predictionCol = "prediction", 
                                metricName = "rmse")
evaluator.evaluate(df_pred)


Out[18]:
4.557525128298466

Build a pipeline


In [19]:
from pyspark.ml.pipeline import Pipeline, PipelineModel

In [20]:
pipeline = Pipeline()
print(pipeline.explainParams())
pipeline.setStages([vectorizer, lr])
pipelineModel = pipeline.fit(df)


stages: a list of pipeline stages (undefined)

In [21]:
pipeline.getStages()


Out[21]:
[VectorAssembler_4bd6aa0aa3917dd77da5, LinearRegression_41e591e8b658af913914]

In [22]:
lr_model = pipelineModel.stages[1]
lr_model .coefficients


Out[22]:
DenseVector([-1.9773, -0.234, 0.0621, -0.158])

In [23]:
pipelineModel.transform(df).show()


+-----+-----+-------+-----+------+--------------------+------------------+
|   AT|    V|     AP|   RH|    EP|            features|        prediction|
+-----+-----+-------+-----+------+--------------------+------------------+
|14.96|41.76|1024.07|73.17|463.26|[14.96,41.76,1024...| 467.2711634437306|
|25.18|62.96|1020.04|59.08|444.37|[25.18,62.96,1020...|444.07766858004396|
| 5.11| 39.4|1012.16|92.14|488.56|[5.11,39.4,1012.1...|483.56251796945776|
|20.86|57.32|1010.24|76.64|446.48|[20.86,57.32,1010...| 450.5559716382537|
|10.82| 37.5|1009.23|96.62| 473.9|[10.82,37.5,1009....| 471.8267489278455|
|26.27|59.44|1012.23|58.77|443.67|[26.27,59.44,1012...| 442.3099415850402|
|15.89|43.96|1014.02|75.24|467.35|[15.89,43.96,1014...|463.96591866241715|
| 9.48|44.71|1019.12|66.43|478.42|[9.48,44.71,1019....| 478.1739705793852|
|14.64| 45.0|1021.78|41.25|475.98|[14.64,45.0,1021....|472.04726822434776|
|11.74|43.56|1015.14|70.72| 477.5|[11.74,43.56,1015...| 473.0492095425741|
|17.99|43.72|1008.64|75.04|453.02|[17.99,43.72,1008...| 459.5670777622559|
|20.14|46.93|1014.66|64.22|453.99|[20.14,46.93,1014...|456.64836515783395|
|24.34| 73.5|1011.31|84.15|440.29|[24.34,73.5,1011....| 438.7681037606262|
|25.71|58.59|1012.77|61.83|451.28|[25.71,58.59,1012...| 443.1661810913976|
|26.19|69.34|1009.48|87.59|433.99|[26.19,69.34,1009...| 435.4263567111474|
|21.42|43.79|1015.76|43.08|462.19|[21.42,43.79,1015...| 458.2610604716974|
|18.21| 45.0|1022.86|48.84|467.54|[18.21,45.0,1022....|463.85600228221267|
|11.04|41.74| 1022.6|77.51| 477.2|[11.04,41.74,1022...| 474.2498032497976|
|14.45|52.75|1023.97|63.59|459.85|[14.45,52.75,1023...| 467.2152077040968|
|13.97|38.47|1015.15|55.28| 464.3|[13.97,38.47,1015...|472.27139648674444|
+-----+-----+-------+-----+------+--------------------+------------------+
only showing top 20 rows


In [24]:
evaluator.evaluate(pipelineModel.transform(df))


Out[24]:
4.557525128298466

Save the pipeline to disk to persist the model


In [25]:
pipelineModel.save("/tmp/lr-pipeline")

In [26]:
!tree /tmp/lr-pipeline


/tmp/lr-pipeline
├── metadata
│   ├── part-00000
│   └── _SUCCESS
└── stages
    ├── 0_VectorAssembler_4bd6aa0aa3917dd77da5
    │   └── metadata
    │       ├── part-00000
    │       └── _SUCCESS
    └── 1_LinearRegression_41e591e8b658af913914
        ├── data
        │   ├── part-00000-49984b04-af0d-4925-b788-e989c20d1c66-c000.snappy.parquet
        │   └── _SUCCESS
        └── metadata
            ├── part-00000
            └── _SUCCESS

7 directories, 8 files

Load the persisted model from the disk


In [27]:
saved_model = PipelineModel.load("/tmp/lr-pipeline")
saved_model.stages[1].coefficients


Out[27]:
DenseVector([-1.9773, -0.234, 0.0621, -0.158])

In [28]:
saved_model.transform(df).show()


+-----+-----+-------+-----+------+--------------------+------------------+
|   AT|    V|     AP|   RH|    EP|            features|        prediction|
+-----+-----+-------+-----+------+--------------------+------------------+
|14.96|41.76|1024.07|73.17|463.26|[14.96,41.76,1024...| 467.2711634437306|
|25.18|62.96|1020.04|59.08|444.37|[25.18,62.96,1020...|444.07766858004396|
| 5.11| 39.4|1012.16|92.14|488.56|[5.11,39.4,1012.1...|483.56251796945776|
|20.86|57.32|1010.24|76.64|446.48|[20.86,57.32,1010...| 450.5559716382537|
|10.82| 37.5|1009.23|96.62| 473.9|[10.82,37.5,1009....| 471.8267489278455|
|26.27|59.44|1012.23|58.77|443.67|[26.27,59.44,1012...| 442.3099415850402|
|15.89|43.96|1014.02|75.24|467.35|[15.89,43.96,1014...|463.96591866241715|
| 9.48|44.71|1019.12|66.43|478.42|[9.48,44.71,1019....| 478.1739705793852|
|14.64| 45.0|1021.78|41.25|475.98|[14.64,45.0,1021....|472.04726822434776|
|11.74|43.56|1015.14|70.72| 477.5|[11.74,43.56,1015...| 473.0492095425741|
|17.99|43.72|1008.64|75.04|453.02|[17.99,43.72,1008...| 459.5670777622559|
|20.14|46.93|1014.66|64.22|453.99|[20.14,46.93,1014...|456.64836515783395|
|24.34| 73.5|1011.31|84.15|440.29|[24.34,73.5,1011....| 438.7681037606262|
|25.71|58.59|1012.77|61.83|451.28|[25.71,58.59,1012...| 443.1661810913976|
|26.19|69.34|1009.48|87.59|433.99|[26.19,69.34,1009...| 435.4263567111474|
|21.42|43.79|1015.76|43.08|462.19|[21.42,43.79,1015...| 458.2610604716974|
|18.21| 45.0|1022.86|48.84|467.54|[18.21,45.0,1022....|463.85600228221267|
|11.04|41.74| 1022.6|77.51| 477.2|[11.04,41.74,1022...| 474.2498032497976|
|14.45|52.75|1023.97|63.59|459.85|[14.45,52.75,1023...| 467.2152077040968|
|13.97|38.47|1015.15|55.28| 464.3|[13.97,38.47,1015...|472.27139648674444|
+-----+-----+-------+-----+------+--------------------+------------------+
only showing top 20 rows


In [29]:
df_train, df_test = df.randomSplit(weights=[0.7, 0.3], seed = 200)

In [30]:
pipelineModel = pipeline.fit(df_train)
evaluator.evaluate(pipelineModel.transform(df_test))


Out[30]:
4.563138184940591

Tune the model


In [31]:
from pyspark.ml.tuning import ParamGridBuilder, TrainValidationSplit

In [32]:
paramGrid = ParamGridBuilder()\
    .addGrid(lr.regParam, [0.1, 0.01]) \
    .addGrid(lr.fitIntercept, [False, True])\
    .addGrid(lr.elasticNetParam, [0.0, 0.5, 1.0])\
    .build()

# In this case the estimator is simply the linear regression.
# A TrainValidationSplit requires an Estimator, a set of Estimator ParamMaps, and an Evaluator.
tvs = TrainValidationSplit(estimator=lr,
                           estimatorParamMaps=paramGrid,
                           evaluator=evaluator,
                           trainRatio=0.8)

tuned_model = tvs.fit(vectorizer.transform(df_train))

In [33]:
tuned_model.bestModel, tuned_model.validationMetrics


Out[33]:
(LinearRegression_41e591e8b658af913914,
 [5.039347932903683,
  5.636217630762049,
  5.107247882789134,
  4.547646522666319,
  4.545696090598205,
  4.545352062329218,
  5.038546732463139,
  5.49137539732243,
  5.628475999107409,
  4.545755899303294,
  4.5455883701354445,
  4.545438239526134])

In [34]:
df_test_pred = tuned_model.transform(vectorizer.transform(df_test))
df_test_pred.show()


+----+-----+-------+-----+------+--------------------+------------------+
|  AT|    V|     AP|   RH|    EP|            features|        prediction|
+----+-----+-------+-----+------+--------------------+------------------+
|1.81|39.42|1026.92|76.97|490.55|[1.81,39.42,1026....| 492.8349077427656|
|1.81|39.42|1026.92|76.97|490.55|[1.81,39.42,1026....| 492.8349077427656|
|1.81|39.42|1026.92|76.97|490.55|[1.81,39.42,1026....| 492.8349077427656|
|1.81|39.42|1026.92|76.97|490.55|[1.81,39.42,1026....| 492.8349077427656|
|2.34|39.42|1028.47|69.68|490.34|[2.34,39.42,1028....|492.94214880761615|
|2.58|39.42|1028.68|69.03|488.69|[2.58,39.42,1028....|492.58206703142434|
|2.58|39.42|1028.68|69.03|488.69|[2.58,39.42,1028....|492.58206703142434|
| 2.8|39.64|1011.01|82.96|482.66|[2.8,39.64,1011.0...| 489.0569119382538|
| 2.8|39.64|1011.01|82.96|482.66|[2.8,39.64,1011.0...| 489.0569119382538|
|3.21|38.44| 1016.9|86.34|491.35|[3.21,38.44,1016....| 488.4152185498315|
|3.21|38.44|1017.11|84.86|492.93|[3.21,38.44,1017....| 488.6394965835994|
|3.26|41.31| 996.32|100.0|489.38|[3.26,41.31,996.3...| 484.4525561174197|
|3.26|41.31| 996.32|100.0|489.38|[3.26,41.31,996.3...| 484.4525561174197|
|3.31|39.42|1024.05|84.31|487.19|[3.31,39.42,1024....| 488.7036922378125|
|3.31|39.42|1024.05|84.31|487.19|[3.31,39.42,1024....| 488.7036922378125|
|3.38|39.64| 1011.0|81.22|488.92|[3.38,39.64,1011....| 488.1800909364742|
|3.38|41.31| 998.79|97.76|489.11|[3.38,41.31,998.7...| 484.6874147567333|
|3.38|41.31| 998.79|97.76|489.11|[3.38,41.31,998.7...| 484.6874147567333|
| 3.4|39.64| 1011.1|83.43|459.86|[3.4,39.64,1011.1...| 487.8310275292321|
| 3.4|39.64| 1011.1|83.43|459.86|[3.4,39.64,1011.1...| 487.8310275292321|
+----+-----+-------+-----+------+--------------------+------------------+
only showing top 20 rows


In [35]:
evaluator.evaluate(df_test_pred)


Out[35]:
4.5683171800255895

In [ ]:


In [ ]: